-- @author lixiyong01
-- @date 2023.12.28
-- 读取数据信息分行显示
-- 'scan.startup.mode'='earliest-offset',


-- source
create table hdp_lbg_supin_ods_zp_n_star_app_log (
    content string comment '使用$$分割的字符串'
)
with(
'connector' = 'kafka',
'topic' = 'hdp_lbg_supin_star_app_log',
'properties.bootstrap.servers' = '10.178.10.8:9092,10.178.10.10:9092,10.178.10.11:9092,10.178.10.12:9092,10.178.10.13:9092',
'scan.startup.mode' = 'group-offsets',
'format' = 'raw',
'properties.client.id' = 'hdp_lbg_supin-hdp_lbg_supin_star_app_log-BQxsW',
'properties.group.id' = 'hdp_lbg_supin-hdp_lbg_supin_star_app_log-BQxsW-www'
);


-- sink
create table hdp_lbg_supin_dwd_zp_n_star_app_log (
    ver string comment 'v0.0.1',
    ts string comment '事件时间格式yyyy-MM-dd HH:mm:ss.SSS',
    host string comment '服务地址',
    log_level string comment '日志级别',
    service_name string comment '服务名称',
    thread_name string comment '线程名称',
    code_file string comment '代码文件',
    code_line string comment '代码行数',
    code_method string comment '代码方法',
    code_class string comment '代码类',
    trace_id string comment 'traceId',
    msg_content string comment '消息内容'
)
with(
'connector' = 'kafka',
'topic' = 'hdp_lbg_supin_dwd_zp_n_star_app_log',
'properties.bootstrap.servers' = '10.178.10.8:9092,10.178.10.10:9092,10.178.10.11:9092,10.178.10.12:9092,10.178.10.13:9092',
'scan.startup.mode' = 'group-offsets',
'format' = 'json'
);


-- sql
set table.dynamic-table-options.enabled=true;
create function SplitFunction AS 'udtf.SplitFunction';

insert into hdp_lbg_supin_dwd_zp_n_star_app_log /*+ OPTIONS(
'properties.client.id' = 'hdp_lbg_supin-hdp_lbg_supin_dwd_zp_n_star_app_log-01hRX') */
select 'v0.0.1' as ver,
    split_index(content,'$$',0) as ts,
    split_index(content,'$$',1) as host,
    split_index(content,'$$',2) as log_level,
    split_index(content,'$$',3) as service_name,
    split_index(content,'$$',4) as thread_name,
    split_index(content,'$$',5) as code_file,
    split_index(content,'$$',6) as code_line,
    split_index(content,'$$',7) as code_method,
    split_index(content,'$$',8) as code_class,
    split_index(content,'$$',9) as trace_id,
    split_index(content,'$$',10) as msg_content,
    regex_str as raw_content
from hdp_lbg_supin_ods_zp_n_star_app_log /*+ OPTIONS(
'properties.client.id' = 'hdp_lbg_supin-hdp_lbg_supin_star_app_log-BQxsW',
'properties.group.id' = 'hdp_lbg_supin-hdp_lbg_supin_star_app_log-BQxsW-dev') */,
lateral table(SplitFunction(content,'\n')) as t(regex_str);


